package cn.icanci.loopstack.rec.admin.biz.thread;

import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.http.Method;
import cn.icanci.loopstack.rec.admin.biz.service.LockService;
import cn.icanci.loopstack.rec.admin.biz.service.RegisterService;
import cn.icanci.loopstack.rec.common.model.config.RegisterVO;
import cn.icanci.loopstack.rec.common.model.socket.PublishDTO;
import cn.icanci.loopstack.rec.common.model.socket.SocketMessage;
import cn.icanci.loopstack.rec.common.model.socket.UriConstant;
import cn.icanci.loopstack.rec.common.utils.GenRegisterKeyUtils;
import cn.icanci.loopstack.rec.engine.script.client.Client;
import cn.icanci.loopstack.rec.engine.script.client.RemoteException;
import cn.icanci.loopstack.rec.engine.script.client.http.HttpClientImpl;
import io.netty.util.internal.ThrowableUtil;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

/**
 * 触发器线程，定时向所有客户端发送心跳，维护连接
 * 
 * @author icanci
 * @since 1.0 Created in 2022/11/22 22:11
 */
@SuppressWarnings("all")
public class TriggerThread {

    private static final Logger                      logger                   = LoggerFactory.getLogger(TriggerThread.class);

    // http://{address}:port+UriConstant.heartbeat;
    private static String                            HEARTBEAT_REQUEST_FORMAT = "http://%s:%s" + UriConstant.HEARTBEAT;
    // http://{address}:port+UriConstant.refresh;
    private static String                            REFRESH_REQUEST_FORMAT   = "http://%s:%s" + UriConstant.REFRESH;

    private static RegisterService registerService;

    private static LockService lockService;

    private static final int                         CORE_SIZE                = Runtime.getRuntime().availableProcessors();

    private static final int                         REGISTER_TIME_OUT        = 90;

    private static final Client                      CLIENT                   = HttpClientImpl.getInstance();

    private static final ThreadPoolExecutor          commonPool               = new ThreadPoolExecutor(CORE_SIZE,            //
        CORE_SIZE << 1,                                                                                                      //
        60L,                                                                                                                 //
        TimeUnit.SECONDS,                                                                                                    //
        new LinkedBlockingQueue<>(2000),                                                                                     //
        runnable -> new Thread(runnable, "TriggerThread Biz Pool-" + runnable.hashCode()),                                   //
        (r, executor) -> {
            throw new RuntimeException("TriggerThread Biz Pool is EXHAUSTED!");
        });

    private static final ScheduledThreadPoolExecutor jobPool                  = new ScheduledThreadPoolExecutor(CORE_SIZE);

    private static final ScheduledThreadPoolExecutor timeoutPool              = new ScheduledThreadPoolExecutor(CORE_SIZE);

    private static LinkedBlockingQueue<RegisterVO>   triggerQueue             = new LinkedBlockingQueue<>();;

    private static Set<String>                       urlRefreshed             = new ConcurrentHashSet<>();

    public static void setRegisterService(RegisterService registerService) {
        TriggerThread.registerService = registerService;
    }

    public static void setLockService(LockService lockService) {
        TriggerThread.lockService = lockService;
    }

    /**
     * 触发刷新缓存
     * 
     * @param registers registers
     */
    public static void trigger(List<RegisterVO> registers) {
        if (CollectionUtils.isEmpty(registers)) {
            return;
        }
        triggerQueue.addAll(registers);
    }

    /** 启动 */
    public static void start() {
        // 定时任务执行 30秒执行一次
        jobPool.scheduleAtFixedRate(new JobRunner(), 0, 30, TimeUnit.SECONDS);

        // 超时处理器 30秒执行一次
        timeoutPool.scheduleAtFixedRate(new TimeOutRunner(), 0, 30, TimeUnit.SECONDS);

        // 进行消息通知触达线程
        Thread triggerQueueThread = new Thread(() -> {
            while (true) {
                try {
                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));

                    while (triggerQueue.size() > 0) {

                        final RegisterVO register = triggerQueue.poll();

                        commonPool.execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    String reqUrl = String.format(REFRESH_REQUEST_FORMAT, register.getClientAddress(), register.getClientPort());

                                    PublishDTO publishDTO = new PublishDTO();

                                    publishDTO.setDomainCodes(Sets.newHashSet(register.getDomain()));

                                    Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, publishDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 0);

                                    SocketMessage call = CLIENT.call(rpcRequest, SocketMessage.class);
                                    if (call.isSuccess()) {
                                        register.setIsDelete(0);
                                        register.setLastUpdateTime(new Date());
                                        registerService.save(register);
                                    } else {
                                        logger.warn("[TriggerThread][start][refresh] error message:{}", call.getContent());
                                    }
                                } catch (Throwable ex) {
                                    logger.error("[TriggerThread][Run][Throwable] error message:{}", ex.getMessage());
                                }
                            }
                        });
                    }
                } catch (Throwable e) {
                    logger.warn("[TriggerThread][start][Throwable] error message:{}", e.getMessage());
                }
            }
        });
        triggerQueueThread.setDaemon(true);
        triggerQueueThread.start();
    }

    /** 任务执行器 */
    private static class JobRunner implements Runnable {

        @Override
        public void run() {
            List<RegisterVO> registerList = registerService.queryAll();
            if (CollectionUtils.isEmpty(registerList)) {
                return;
            }
            // 探活
            for (RegisterVO register : registerList) {
                commonPool.execute(new HeartbeatRunner(register));
            }
            // 相同ip和port只需要心跳一次即可
            // 心跳结束更新
            urlRefreshed = new ConcurrentHashSet<>();
        }
    }

    /** 心跳执行器 */
    private static class HeartbeatRunner implements Runnable {

        private RegisterVO register;

        public HeartbeatRunner(RegisterVO register) {
            this.register = register;
        }

        @Override
        @SuppressWarnings("all")
        public void run() {
            // 加锁
            String key = GenRegisterKeyUtils.generateKey(register);
            String token = lockService.acquire(key, 3000);
            try {

                if (StringUtils.isBlank(token)) {
                    return;
                }

                String address = register.getClientAddress();
                int port = register.getClientPort();
                String reqUrl = String.format(HEARTBEAT_REQUEST_FORMAT, address, port);

                if (urlRefreshed.contains(reqUrl)) {
                    return;
                }
                urlRefreshed.add(reqUrl);

                Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, Maps.newHashMap(), Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 0);

                SocketMessage call = CLIENT.call(rpcRequest, SocketMessage.class);
                if (call.isSuccess()) {
                    register.setIsDelete(0);
                    register.setLastUpdateTime(new Date());
                    registerService.save(register);
                } else {
                    logger.error("[HeartbeatRunner][run] error message:{}", call.getContent());
                }
            } catch (RemoteException ex) {
                // no op
                logger.error("[HeartbeatRunner][RemoteException] error message:{}", ThrowableUtil.stackTraceToString(ex));
            } catch (Throwable ex) {
                // no op
                logger.error("[HeartbeatRunner][Throwable] error message:{}", ThrowableUtil.stackTraceToString(ex));
            } finally {
                lockService.release(key, token);
            }
        }
    }

    /** 超时执行器 */
    private static class TimeOutRunner implements Runnable {

        @Override
        public void run() {
            List<RegisterVO> registerList = registerService.queryAll();
            if (CollectionUtils.isEmpty(registerList)) {
                return;
            }
            // 探活
            for (RegisterVO register : registerList) {
                tryRemove(register);
            }
        }

        private void tryRemove(RegisterVO register) {
            try {
                boolean isDelay = (System.currentTimeMillis() - register.getLastUpdateTime().getTime()) / 1000 > REGISTER_TIME_OUT;
                if (isDelay) {
                    register.setIsDelete(1);
                    register.setLastUpdateTime(new Date());
                    registerService.save(register);
                }
            } catch (Throwable ex) {
                // no op
                logger.error("[TimeOutRunner][Throwable] error message:{}", ex.getMessage());
            }
        }
    }
}
